#ifndef _IOT_QBUS_H_
#define _IOT_QBUS_H_ 1

#ifdef __cplusplus
extern "C" {
#endif

#define INVALID_SEQ 0
#define IS_CALL(seq) seq != INVALID_SEQ

#define TOPIC_ALL ""

struct qbus{
    void *priv;
    void (*free)(struct qbus *bus);
    /**
     * @param bus
     * @param topic
     * @param payload
     * @param payload_len
     * @return 0(ok) !0(failed)
     */
    int (*publish)(struct qbus *bus, const char *topic, const char *payload, unsigned int payload_len);
    int (*publish_ext)(struct qbus *bus, const char *topic, const char *header, unsigned int header_len, const char *payload, unsigned int payload_len);
    /**
     * @param bus
     * @param topic
     * @param payload
     * @param payload_len
     * @param resp
     * @param resp_len
     * @param resp_act_len
     * @param timeout: ms
     * @return 0(ok) !0(failed)
     */
    int (*call)(struct qbus *bus, const char *topic, const char *payload, unsigned int payload_len,
                      char *resp, unsigned int resp_len, unsigned int *resp_act_len, unsigned int timeout);
};

struct qbus *create_qbus(unsigned int limit);

typedef void (*qreader)(const char *topic, unsigned int topic_len, const void *payload, unsigned int payload_len, unsigned int seq, void *args);

struct qconsumer {
    void *priv;
    void (*free)(struct qconsumer *consumer);
    /**
     * @param consumer
     * @param topic
     * @return 0(ok) !0(failed)
     */
    int (*subscribe)(struct qconsumer *consumer, const char *topic);
    /**
     * @param consumer
     * @param topic
     * @return 0(ok) !0(failed)
     */
    int (*unsubscribe)(struct qconsumer *consumer, const char *topic);
    /**
     * @param consumer
     * @param topic
     * @param topic_len
     * @param payload
     * @param payload_len
     * @param act_payload_len
     * @param out_seq : use IS_CALL(out_seq) to check. if ture, call @send_resp
     * @return 0(ok) !0(failed)
     */
    int (*read)(struct qconsumer *consumer, char *topic, unsigned topic_len,
                      char *payload, unsigned payload_len, unsigned int *act_payload_len, unsigned int *out_seq);
    /**
     * DO NOT call bus or consumer functions, it will cause dead lock!
     * @param consumer
     * @param reader
     * @param args
     * @return
     */
    int (*read_map)(struct qconsumer *consumer, qreader reader, void *args);
    int (*send_resp)(struct qconsumer *consumer, unsigned int seq, const char *payload, unsigned int payload_len);
};
struct qconsumer* create_consumer(struct qbus *bus);

// @return 0: not matched, !0: matched
typedef int (*consumer_filter_t)(const char *topic, unsigned int topic_len,
                const char *msg_topic, unsigned int msg_topic_len, const char *payload, unsigned int payload_len);
struct qconsumer* create_consumer_with_filter(struct qbus *bus, consumer_filter_t filter);

typedef void (*qdispatcher_cb)(unsigned int id, const void *payload, unsigned int payload_len, void *args);

struct qdispatcher {
    void *priv;
    void (*free)(struct qdispatcher *qd);
    void (*broadcast)(struct qdispatcher *qd, unsigned int id, const void *data, unsigned int data_len);
    void (*add_listener)(struct qdispatcher *qd, unsigned int id, qdispatcher_cb cb, void *args);
    void (*remove_listener)(struct qdispatcher *qd, unsigned int id, qdispatcher_cb cb);
};
struct qdispatcher* create_qdispatcher(struct qbus *bus);

#ifdef __cplusplus
};
#endif

#endif /* _IOT_QBUS_H_ */